package com.atguigu.gmall.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.app.func.BeanToJsonStrFunction;
import com.atguigu.gmall.realtime.beans.UserLoginBean;
import com.atguigu.gmall.realtime.utils.DateFormatUtil;
import com.atguigu.gmall.realtime.utils.DorisUtil;
import com.atguigu.gmall.realtime.utils.KafkaUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * Created by 黄凯 on 2023/7/16 0016 15:41
 *
 * @author 黄凯
 * 永远相信美好的事情总会发生.
 * <p>
 * 用户登录聚合统计(独立用户以及7日回流用户)
 */
public class DwsUserUserLoginWindow {

    public static void main(String[] args) throws Exception {

        //TODO 1.基本环境准备
        //1.1 指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(4);
        //TODO 2.检查点相关的设置
        env.enableCheckpointing(5000L);
        //TODO 3.从kafka的主题中读取数据
        //3.1 声明消费的主题以及消费者组
        String topic = "dwd_traffic_page_log";
        String groupId = "dws_user_user_login_group";
        //3.2 创建消费者对象
        KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId);
        //3.3 消费数据  封装为流
        DataStreamSource<String> kafkaStrDS
                = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source");
        //TODO 4.对流中数据类型进行转换    jsonStr->jsonObj
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaStrDS.map(JSON::parseObject);

        //TODO 5.过滤登录行为
        SingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.filter(
                new FilterFunction<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject jsonObj) throws Exception {

                        //应保留 uid 不为 null 且 last_page_id 为 null 或 last_page_id 为 login 的浏览记录
                        String uid = jsonObj.getJSONObject("common").getString("uid");
                        String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
                        return StringUtils.isNotEmpty(uid)
                                && (StringUtils.isEmpty(lastPageId) || "login".equals(lastPageId));


                    }
                }
        );

        // filterDS.print(">>>");

        //TODO 6.指定Watermark以及提取事件时间字段
        SingleOutputStreamOperator<JSONObject> withWatermarkDS = filterDS.assignTimestampsAndWatermarks(
                WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
                        .withTimestampAssigner(
                                new SerializableTimestampAssigner<JSONObject>() {


                                    @Override
                                    public long extractTimestamp(JSONObject jsonObj, long recordTimestamp) {

                                        return jsonObj.getLong("ts");

                                    }
                                }
                        )

        );

        //TODO 7.按照用户id进行分组
        KeyedStream<JSONObject, String> keyedDS
                = withWatermarkDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("uid"));

        //TODO 8.使用Flink的状态编程  判断是否为独立用户以及回流用户 jsonObj->实体类
        SingleOutputStreamOperator<UserLoginBean> processDS = keyedDS.process(
                new KeyedProcessFunction<String, JSONObject, UserLoginBean>() {

                    private ValueState<String> lastLoginDateState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        ValueStateDescriptor<String> valueStateDescriptor
                                = new ValueStateDescriptor<>("valueStateDescriptor", String.class);

                        lastLoginDateState = getRuntimeContext().getState(valueStateDescriptor);
                    }


                    @Override
                    public void processElement(JSONObject jsonObj,
                                               KeyedProcessFunction<String, JSONObject, UserLoginBean>.Context ctx,
                                               Collector<UserLoginBean> out) throws Exception {

                        //获取当前用户上次登录日期
                        String lastLoginDate = lastLoginDateState.value();
                        //获取当前登录日期
                        Long ts = jsonObj.getLong("ts");
                        String curLoginDate = DateFormatUtil.toDate(ts);
                        Long uuCt = 0L;
                        Long backCt = 0L;

                        if (StringUtils.isNotEmpty(lastLoginDate)) {

                            //若状态中的末次登陆日期不为 null，进一步判断。
                            if (!lastLoginDate.equals(curLoginDate)) {

                                //如果末次登陆日期不等于当天日期则独立用户数 uuCt 记为 1，并将状态中的末次登陆日期更新为当日，进一步判断。
                                uuCt = 1L;
                                lastLoginDateState.update(curLoginDate);

                                //如果当天日期与末次登陆日期之差大于等于8天则回流用户数backCt置为1。
                                long days = (ts - DateFormatUtil.toTs(lastLoginDate)) / 1000 / 60 / 60 / 24;

                                if (days >= 8) {
                                    backCt = 1L;
                                }


                            }


                        } else {

                            // 如果状态中的末次登陆日期为 null，将 uuCt 置为 1，backCt 置为 0，并将状态中的末次登陆日期更新为当日。
                            uuCt = 1L;
                            lastLoginDateState.update(curLoginDate);

                        }

                        if (!uuCt.equals(0L) || !backCt.equals(0L)) {

                            out.collect(
                                    new UserLoginBean(
                                            "",
                                            "",
                                            "",
                                            backCt,
                                            uuCt,
                                            ts
                                    )
                            );


                        }


                    }
                }
        );

        //TODO 9.开窗
        AllWindowedStream<UserLoginBean, TimeWindow> windowDS = processDS.windowAll(
                TumblingEventTimeWindows.of(Time.seconds(10))
        );

        //TODO 10.聚合操作
        SingleOutputStreamOperator<UserLoginBean> reduceDS = windowDS.reduce(
                new ReduceFunction<UserLoginBean>() {
                    @Override
                    public UserLoginBean reduce(UserLoginBean value1, UserLoginBean value2) throws Exception {

                        value1.setUuCt(value1.getUuCt() + value2.getUuCt());
                        value1.setBackCt(value1.getBackCt() + value2.getBackCt());
                        return value1;

                    }
                },

                new AllWindowFunction<UserLoginBean, UserLoginBean, TimeWindow>() {


                    @Override
                    public void apply(TimeWindow window,
                                      Iterable<UserLoginBean> values,
                                      Collector<UserLoginBean> out) throws Exception {

                        String stt = DateFormatUtil.toYmdHms(window.getStart());
                        String edt = DateFormatUtil.toYmdHms(window.getEnd());
                        String curDate = DateFormatUtil.toPartitionDate(window.getStart());
                        for (UserLoginBean loginBean : values) {
                            loginBean.setStt(stt);
                            loginBean.setEdt(edt);
                            loginBean.setCurDate(curDate);
                            out.collect(loginBean);
                        }

                    }
                }

        );

        //TODO 11.将聚合的结果写到Doris中
        reduceDS.print(">>>");


        /**
         * >>>:4> UserLoginBean(stt=2023-07-15 16:13:00,
         * edt=2023-07-15 16:13:10, curDate=20230715, backCt=0, uuCt=32, ts=1689408783000)
         */
        reduceDS
                .map(new BeanToJsonStrFunction<>())
                .sinkTo(DorisUtil.getDorisSink("dws_user_user_login_window"));

        env.execute();



    }

}
